package cn.uncode.rpc.config;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.lang3.StringUtils;

import cn.uncode.rpc.cluster.Cluster;
import cn.uncode.rpc.common.CommonConstant;
import cn.uncode.rpc.config.annotation.ConfigDesc;
import cn.uncode.rpc.core.Factory;
import cn.uncode.rpc.core.URL;
import cn.uncode.rpc.core.URLParam;
import cn.uncode.rpc.core.proxy.CallerInvocationHandler;
import cn.uncode.rpc.exception.ConfigException;
import cn.uncode.rpc.registry.NotifyListener;
import cn.uncode.rpc.registry.Registry;
import cn.uncode.rpc.spi.ExtensionLoader;
import cn.uncode.rpc.util.CollectionUtil;
import cn.uncode.rpc.util.NetUtil;
import cn.uncode.rpc.util.StringTools;



/**
 * 
 * caller config.
 * 
 */

public class CallerConfig<T> extends AbstractCallerConfig {
	


    private static final long serialVersionUID = -2299754608229467887L;

    private Class<T> interfaceClass;

    // 具体到方法的配置
    protected List<MethodConfig> methods;

    // 点对点直连服务提供地址
    private String directUrl;

    private AtomicBoolean initialized = new AtomicBoolean(false);

    private T ref;

    private BasicCallerConfig basicCaller;

    private List<NotifyListener<T>> clusterNotifyListeners;

    public List<MethodConfig> getMethods() {
        return methods;
    }

    public void setMethods(List<MethodConfig> methods) {
        this.methods = methods;
    }

    public void setMethods(MethodConfig methods) {
        this.methods = Collections.singletonList(methods);
    }

    public boolean hasMethods() {
        return this.methods != null && !this.methods.isEmpty();
    }

    public T getRef() {
        if (ref == null) {
            initRef();
        }
        return ref;
    }

    @SuppressWarnings({"unchecked", "rawtypes"})
    public synchronized void initRef() {
        if (initialized.get()) {
            return;
        }

        try {
            interfaceClass = (Class) Class.forName(interfaceClass.getName(), true, Thread.currentThread().getContextClassLoader());
        } catch (ClassNotFoundException e) {
            throw new ConfigException("ReferereConfig initRef Error: Class not found " + interfaceClass.getName(), e);
        }

        if (CollectionUtil.isEmpty(protocols)) {
            throw new ConfigException(String.format("%s RefererConfig is malformed, for protocol not set correctly!",
                    interfaceClass.getName()));
        }

        checkInterfaceAndMethods(interfaceClass, methods);

        clusterNotifyListeners = new ArrayList<NotifyListener<T>>(protocols.size());
        List<Cluster<T>> clusters = new ArrayList<Cluster<T>>(protocols.size());

        Factory factory = ExtensionLoader.getExtensionLoader(Factory.class).getExtension(CommonConstant.DEFAULT_VALUE);

        List<URL> registryUrls = loadRegistryUrls();
        String localIp = getLocalHostAddress(registryUrls);
        for (ProtocolConfig protocol : protocols) {
            Map<String, String> params = new HashMap<String, String>();
            params.put(URLParam.NODE_TYPE.getName(), CommonConstant.NODE_TYPE_REFERER);
            params.put(URLParam.VERSION.getName(), URLParam.VERSION.getValue());
            params.put(URLParam.REFRESH_TIMESTAMP.getName(), String.valueOf(System.currentTimeMillis()));

            collectConfigParams(params, protocol, basicCaller, extConfig, this);
            collectMethodConfigParams(params, this.getMethods());

            URL refUrl = new URL(protocol.getName(), localIp, CommonConstant.DEFAULT_INT_VALUE, interfaceClass.getName(), params);
            NotifyListener notifyListener = createClusterNotifyListener(refUrl, factory, registryUrls);

            clusterNotifyListeners.add(notifyListener);
            clusters.add(notifyListener.getCluster());
        }

        ref = factory.getProxy(interfaceClass, new CallerInvocationHandler<T>(interfaceClass, clusters));

        initialized.set(true);
    }

    private NotifyListener<T> createClusterNotifyListener(URL refUrl, Factory factory, List<URL> registryUrls) {
        List<URL> regUrls = new ArrayList<URL>();
        // 如果用户指定directUrls 或者 injvm协议访问，则使用local registry
        if (StringUtils.isNotBlank(directUrl) || CommonConstant.PROTOCOL_INJVM.equals(refUrl.getProtocol())) {
            URL regUrl =
                    new URL(CommonConstant.REGISTRY_PROTOCOL_LOCAL, NetUtil.LOCALHOST, CommonConstant.DEFAULT_INT_VALUE,
                            Registry.class.getName());
            if (StringUtils.isNotBlank(directUrl)) {
                StringBuilder duBuf = new StringBuilder(128);
                String[] dus = CommonConstant.COMMA_SPLIT_PATTERN.split(directUrl);
                for (String du : dus) {
                    if (du.contains(":")) {
                        String[] hostPort = du.split(":");
                        URL durl = refUrl.createCopy();
                        durl.setHost(hostPort[0].trim());
                        durl.setPort(Integer.parseInt(hostPort[1].trim()));
                        durl.addParameter(URLParam.NODE_TYPE.getName(), CommonConstant.NODE_TYPE_SERVICE);
                        duBuf.append(StringTools.urlEncode(durl.toFullStr())).append(CommonConstant.COMMA_SEPARATOR);
                    }
                }
                if (duBuf.length() > 0) {
                    duBuf.deleteCharAt(duBuf.length() - 1);
                    regUrl.addParameter(URLParam.DIRECT_URL.getName(), duBuf.toString());
                }
            }
            regUrls.add(regUrl);
        } else { // 通过注册中心配置拼装URL，注册中心可能在本地，也可能在远端
            if (registryUrls == null || registryUrls.isEmpty()) {
                throw new IllegalStateException(
                        String.format(
                                "No registry to reference %s on the consumer %s , please config <motan:registry address=\"...\" /> in your spring config.",
                                interfaceClass, NetUtil.LOCALHOST));
            }
            for (URL url : registryUrls) {
                regUrls.add(url.createCopy());
            }
        }

        for (URL url : regUrls) {
            url.addParameter(URLParam.EMBED.getName(), StringTools.urlEncode(refUrl.toFullStr()));
        }
        return factory.getClusterSupport(interfaceClass, regUrls);
    }

    public synchronized void destroy() {
        if (clusterNotifyListeners != null) {
            for (NotifyListener<T> notifyListener : clusterNotifyListeners) {
            	notifyListener.destroy();
            }
        }
        ref = null;
        initialized.set(false);
    }

    public void setInterface(Class<T> interfaceClass) {
        if (interfaceClass != null && !interfaceClass.isInterface()) {
            throw new IllegalStateException("The interface class " + interfaceClass + " is not a interface!");
        }
        this.interfaceClass = interfaceClass;
    }

    public Class<?> getInterface() {
        return interfaceClass;
    }

    public String getDirectUrl() {
        return directUrl;
    }

    public void setDirectUrl(String directUrl) {
        this.directUrl = directUrl;
    }

    @ConfigDesc(excluded = true)
    public BasicCallerConfig getBasicCaller() {
        return basicCaller;
    }

    public void setBasicReferer(BasicCallerConfig basicCaller) {
        this.basicCaller = basicCaller;
    }

    public List<NotifyListener<T>> getNotifyListeners() {
        return clusterNotifyListeners;
    }

    public AtomicBoolean getInitialized() {
        return initialized;
    }



}
